--- title: Data Augmentation for Audio keywords: fastai sidebar: home_sidebar summary: "Transforms to apply data augmentation to AudioSpectrograms and Signals" description: "Transforms to apply data augmentation to AudioSpectrograms and Signals" ---
{% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}
{% endraw %}

Setup Examples

{% raw %}
#p = Config()['data_path'] / 'ST-AEDS-20180100_1-OS'
#untar_data(URLs.SPEAKERS10, fname=str(p)+'.tar', dest=p)
p = untar_data(URLs.SPEAKERS10, extract_func=tar_extract_at_filename)
x = AudioGetter("", recurse=True, folders=None)
files = x(p)
{% endraw %} {% raw %}
#files will load differently on different machines so we specify examples by name
ex_files = [p/f for f in ['m0005_us_m0005_00218.wav', 
                                'f0003_us_f0003_00279.wav', 
                                'f0001_us_f0001_00168.wav', 
                                'f0005_us_f0005_00286.wav',]]
{% endraw %} {% raw %}
#sc= single channel, mc = multichannel
@docs
class GenExample:
    "Generate individual or batch of single/multichannel AudioTensors and AudioSpectrograms for testing"
    
    
    def audio_sc(): return AudioTensor.create(ex_files[0])

    def audio_mc():
        #get 3 equal length portions of 3 different signals so we can stack them
        #for a fake multichannel example
        ai0, ai1, ai2 = map(AudioTensor.create, ex_files[1:4]);
        min_samples = min(ai0.nsamples, ai1.nsamples, ai2.nsamples)
        s0, s1, s2 = map(lambda x: x[:,:min_samples], (ai0, ai1, ai2))
        return AudioTensor(torch.cat((s0, s1, s2), dim=0), 16000)

    def audio_sc_batch(bs=8):
        return AudioTensor(torch.stack([AudioTensor.create(ex_files[0]) for i in range(bs)]), 16000)

    def audio_mc_batch(bs=8):
        return AudioTensor(torch.stack([GenExample.audio_mc() for i in range(bs)]), 16000)
    
    def sg_sc():
        DBMelSpec = SpectrogramTransformer(mel=True, to_db=True)
        a2s = DBMelSpec(n_fft = 1024, hop_length=256)
        return a2s(GenExample.audio_sc())
    def sg_mc(): 
        DBMelSpec = SpectrogramTransformer(mel=True, to_db=True)
        a2s = DBMelSpec(n_fft = 1024, hop_length=256)
        return a2s(GenExample.audio_mc())
    
    _docs=dict(audio_sc="Generate a single-channel audio", 
               audio_mc="Generate a multi-channel audio", 
               audio_sc_batch="Generate a batch of single-channel audios", 
               audio_mc_batch="Generate a batch of multi-channel audios",
               sg_sc="Generate a spectrogram of a single-channel audio",
               sg_mc="Generate a spectrogram of a multi-channel audio",
               #sg_sc_batch="Generate a batch of spectrograms of a single-channel audios",
              )
{% endraw %}
Warning: calling inp.clone() does not copy it's attributes, such as sr in the case of an AudioTensor. Since transforms mutate in place, any testing that requires the input sample rate must store the sample rate before the transform so that it may be tested after. See the tests in resampling for an example
{% raw %}
def show_transform(transform, gen_input=GenExample.audio_sc, show=True):
    '''Generate a new input, apply transform, and display/return both input and output'''
    inp = gen_input()
    inp_orig = inp.clone()
    if show is not None: inp.show() if show else inp.hear()
    out = transform(inp, split_idx=0) if isinstance(transform, RandTransform) else transform(inp)
    if show is not None: out.show() if show else out.hear()
    return inp_orig, out
{% endraw %} {% raw %}
aud_ex = GenExample.audio_sc()
aud_mc_ex = GenExample.audio_mc()
aud_batch = GenExample.audio_sc_batch(4)
aud_mc_batch = GenExample.audio_mc_batch(8)
test_eq(type(aud_ex), AudioTensor)
test_eq(type(aud_batch), AudioTensor)
test_eq(aud_batch.shape, torch.Size([4, 1, 58240]))
test_eq(aud_mc_batch.shape, torch.Size([8, 3, 53760]))
{% endraw %}

Preprocessing Functions

TO-DO:
1. Add in longer clips (whale) and do more extensive testing. Current clip only allows us to test Trim, not All or Split

Remove Silence

{% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}

RemoveSilence[source]

RemoveSilence(remove_type='trim', threshold=20, pad_ms=20)

{% endraw %}

Trim Silence

{% raw %}
silencer = RemoveSilence(threshold=20, pad_ms=20)
orig, silenced = show_transform(silencer, GenExample.audio_sc)
# test that at least a half second of silence is being removed
test(silenced.nsamples + 8000, orig.nsamples, operator.le)
{% endraw %} {% raw %}
#test that nothing is removed from audio that doesnt contain silence
audio_orig = GenExample.audio_sc()
test_aud = AudioTensor(torch.rand_like(audio_orig), 16000)
orig_samples = test_aud.nsamples
print(f"Random Noise, no silence, {orig_samples} samples")
test_aud.hear()

for rm_type in [RemoveType.All, RemoveType.Trim, RemoveType.Split]:
    silence_audio_trim = RemoveSilence(rm_type, threshold=20, pad_ms=20)(test_aud)
    print(f"After silence remove {rm_type}, {silence_audio_trim.nsamples} samples")
    test_eq(orig_samples, silence_audio_trim.nsamples)
Random Noise, no silence, 58240 samples
<<<<<<< local
=======
>>>>>>> remote
After silence remove all, 58240 samples
After silence remove trim, 58240 samples
After silence remove split, 58240 samples
{% endraw %} {% raw %}
# trim silence from a multichannel clip, needs more extensive testing
silence_mc = RemoveSilence(threshold=20, pad_ms=20)
inp, out = show_transform(silence_mc, GenExample.audio_mc, show=False)
{% endraw %}

Trim Silence Timing Tests

{% raw %}
silencer = RemoveSilence(threshold=20, pad_ms=20)
audio_sc = GenExample.audio_sc()
audio_mc = GenExample.audio_mc()
{% endraw %} {% raw %}
%%timeit -n10
silencer(audio_sc)
<<<<<<< local <modified: >
1.63 ms ± 92.1 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
614 µs ± 87.2 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>>>>>>> remote <modified: >
{% endraw %} {% raw %}
%%timeit -n10
silencer(audio_mc)
<<<<<<< local <modified: >
1.64 ms ± 40.5 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
1.13 ms ± 54.5 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>>>>>>> remote <modified: >
{% endraw %}

Resampling

{% raw %}
{% endraw %} {% raw %}

Resample[source]

Resample(sr_new)

{% endraw %} {% raw %}
#Make sure if old and new sample rates are the same, a new identical AudioTensor is returned
no_resample_needed = Resample(audio_orig.sr)
inp, out = show_transform(no_resample_needed, GenExample.audio_sc)
test_eq(inp.sr, out.sr)
test_eq(inp, out)
{% endraw %} {% raw %}
#test and hear realistic sample rates
audio_orig = GenExample.audio_sc()
orig_sr = audio_orig.sr
print(f"Original Sample Rate {orig_sr} \n")
for rate in [2000,4000,8000,22050,44100]:
    resampler = Resample(rate)
    print("Sample Rate", rate)
    inp, out = show_transform(resampler, GenExample.audio_sc, show=False)
    test_eq(out.nsamples, inp.nsamples//(orig_sr/rate))
Original Sample Rate 16000 

Sample Rate 2000
Sample Rate 4000
Sample Rate 8000
Sample Rate 22050
Sample Rate 44100
{% endraw %} {% raw %}
#resample a multichannel audio
resampler = Resample(8000)
inp, out = show_transform(resampler, GenExample.audio_mc, show=False)
test_eq(inp.nsamples//2, out.nsamples)
test_eq(inp.nchannels, out.nchannels)
test_eq(out.sr, 8000)
{% endraw %} {% raw %}
for i in range(100):
    audio_orig = GenExample.audio_sc()
    random_sr = random.randint(16000, 72000)
    random_upsample = Resample(random_sr)(audio_orig)
    num_samples = random_upsample.nsamples
    test_close(num_samples, abs(audio_orig.nsamples//(audio_orig.sr/random_sr)), eps=1.1)
<<<<<<< local <modified: text/html; unchanged: text/plain>
100.00% [100/100 00:08<00:00]
=======
>>>>>>> remote <removed>
{% endraw %}

Resample Timing Tests

{% raw %}
# Polyphase resampling's speed is dependent on the GCD between old and new rate. For almost all used sample rates it
# will be very fast and much better than any FFT based method. It is slow however in the unlikely event that the 
# GCD is small (demonstrated below w GCD of 1 for last 2 examples)
common_downsample = Resample(8000)
slow_downsample = Resample(8001)
slow_upsample = Resample(27101)
audio_sc = GenExample.audio_sc()
audio_mc = GenExample.audio_mc()
{% endraw %} {% raw %}
%%time
common_downsample(audio_sc)
<<<<<<< local <modified: >
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 2.66 ms
=======
CPU times: user 2.16 ms, sys: 0 ns, total: 2.16 ms
Wall time: 1.89 ms
>>>>>>> remote <modified: >
AudioTensor([[ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ..., -2.7515e-04,
         -2.1502e-04, -3.7679e-05]])
{% endraw %} {% raw %}
%%time
common_downsample(audio_mc)
<<<<<<< local <modified: >
CPU times: user 28 ms, sys: 0 ns, total: 28 ms
Wall time: 6.83 ms
=======
CPU times: user 4.38 ms, sys: 12 µs, total: 4.39 ms
Wall time: 3.81 ms
>>>>>>> remote <modified: >
AudioTensor([[ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ..., -1.7625e-03,
         -1.2562e-03, -6.0365e-04],
        [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ..., -3.1290e-05,
         -3.8538e-05,  1.0710e-04],
        [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  4.9272e-05,
          3.9761e-05,  8.9849e-05]])
{% endraw %} {% raw %}
%%time
slow_downsample(audio_sc)
<<<<<<< local <modified: >
CPU times: user 100 ms, sys: 4 ms, total: 104 ms
Wall time: 26.5 ms
=======
CPU times: user 22.6 ms, sys: 0 ns, total: 22.6 ms
Wall time: 21.9 ms
>>>>>>> remote <modified: >
AudioTensor([[ 1.3499e-23,  2.6595e-12, -1.2066e-11,  ..., -2.6653e-04,
         -1.5343e-04, -3.0850e-06]])
{% endraw %} {% raw %}
%%time
slow_upsample(audio_mc)
<<<<<<< local <modified: >
CPU times: user 144 ms, sys: 4 ms, total: 148 ms
Wall time: 97.4 ms
=======
CPU times: user 82.5 ms, sys: 73 µs, total: 82.6 ms
Wall time: 81.8 ms
>>>>>>> remote <modified: >
AudioTensor([[ 2.7587e-24,  1.1815e-09,  1.7938e-09,  ..., -3.7292e-04,
         -1.7601e-04, -3.4302e-05],
        [-9.0046e-25,  2.0666e-09,  3.0623e-09,  ...,  9.8098e-05,
          6.0099e-05,  1.4080e-05],
        [ 1.8593e-24,  3.2510e-09,  4.8764e-09,  ...,  6.8579e-05,
          3.7435e-05,  8.0894e-06]])
{% endraw %}

Signal Transforms

Signal Cropping/Padding

CropSignal and CropTime can either be merged into one function, or they can outsource the bulk of their behavior to a shared cropping function
{% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}

CropSignal[source]

CropSignal(duration, pad_mode='zeros')

{% endraw %} {% raw %}
{% endraw %} {% raw %}
inp, out1000 = show_transform(CropSignal(1000), GenExample.audio_sc, show=None)
inp, out2000 = show_transform(CropSignal(2000), GenExample.audio_sc, show=None)
inp, out5000 = show_transform(CropSignal(5000), GenExample.audio_sc, show=None)
{% endraw %} {% raw %}
print(f"Original Audio is {inp.duration:.2f} seconds")
inp.show()
out1000.show()
out2000.show()
out5000.show()
Original Audio is 3.64 seconds
{% endraw %} {% raw %}
test_eq(out1000.nsamples, 1*inp.sr)
test_eq(out2000.nsamples, 2*inp.sr)
test_eq(out5000.nsamples, 5*inp.sr)
test_eq(out1000.duration, 1)
test_eq(out2000.duration, 2)
test_eq(out5000.duration, 5)
{% endraw %} {% raw %}
inp, mc1000 = show_transform(CropSignal(1000), GenExample.audio_mc, show=None)
inp, mc2000 = show_transform(CropSignal(2000), GenExample.audio_mc, show=None)
inp, mc5000 = show_transform(CropSignal(5000), GenExample.audio_mc, show=None)
test_eq(mc1000.duration, 1)
test_eq(mc2000.duration, 2)
test_eq(mc5000.duration, 5)
{% endraw %}

Test Signal Padding Modes

{% raw %}
# test pad_mode zeros-after
audio_orig = GenExample.audio_sc()
cropsig_pad_after = CropSignal(5000, pad_mode=AudioPadType.Zeros_After)
# generate a random input signal that is 3s long
gen_func = lambda: AudioTensor(torch.rand(1, 48000), 16000)
inp, out = show_transform(cropsig_pad_after, gen_func)
# test end of signal is padded with zeros
test_eq(out[:,-10:], torch.zeros_like(out)[:,-10:])
# test front of signal is not padded with zeros
test_ne(out[:,0:10] , out[:,-10:])
<<<<<<< local
=======
>>>>>>> remote
{% endraw %} {% raw %}
# test pad_mode zeros by verifying signal begins and ends with zeros
cropsig_pad = CropSignal(5000)
inp, out = show_transform(cropsig_pad, GenExample.audio_sc)
test_eq(out[:,0:2], out[:,-2:])
{% endraw %} {% raw %}
# test pad_mode repeat by making sure that columns are equal at the appropriate offsets
cropsig_repeat = CropSignal(12000, pad_mode=AudioPadType.Repeat)
inp, out = show_transform(cropsig_repeat, GenExample.audio_sc)
for i in range(inp.nsamples):
    test_eq(out[:,i], out[:,i+inp.nsamples])
    test_eq(out[:,i], out[:,i+2*inp.nsamples])
{% endraw %} {% raw %}
# test bad pad_mode doesnt fail silently
test_fail(CropSignal(12000, pad_mode="tenchify"))
{% endraw %} {% raw %}
# demonstrate repeat mode works on multichannel data (change "show=None" to "True" to see output)
inp, out = show_transform(cropsig_repeat, GenExample.audio_mc, show=None)
{% endraw %}

Cropping/Padding Timing Tests

{% raw %}
%%time
aud1s = CropSignal(1000)(audio_orig)
{% endraw %} {% raw %}
%%time
aud2s = CropSignal(2000)(audio_orig)
<<<<<<< local <modified: >
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 206 µs
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%time
aud5s = CropSignal(5000)(audio_orig)
<<<<<<< local <modified: >
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 211 µs
=======
>>>>>>> remote <removed>
{% endraw %}

Signal Shifting

{% raw %}
#v1 used scipy.ndimage.interpolation.shift but it was extremely slow (14-16ms) so I rewrote and got it down to 50µs
{% endraw %} {% raw %}
np.roll(np.array([1,2,3,4,5,6,7]), 2)
{% endraw %} {% raw %}
# version before optimization
# def _shift(sig, s):
#     samples = sig.shape[-1]
#     if   s == 0: return sig
#     elif  s < 0: return torch.cat([sig[...,-1*s:], torch.zeros_like(sig)[...,s:]], dim=-1)
#     else       : return torch.cat([torch.zeros_like(sig)[...,:s], sig[...,:samples-s]], dim=-1)

# def shift_signal(t:torch.Tensor, shift, roll):
#     #refactor 2nd half of this statement to just take and roll the final axis
#     if roll: t.data = torch.from_numpy(np.roll(t.numpy(), shift, axis=-1))
#     else   : t.data = _shift(t, shift)
#     return t
{% endraw %} {% raw %}
{% endraw %} {% raw %}

shift_signal[source]

shift_signal(t:Tensor, shift, roll)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

class SignalShifter[source]

SignalShifter(p=0.5, max_pct=0.2, max_time=None, direction=0, roll=False) :: RandTransform

A transform that before_call its state at each __call__

{% endraw %} {% raw %}
t1 = torch.tensor([[1,2,3,4,5,6,7,8,9,10]])
t3 = torch.tensor([[1,2,3,4,5,6,7,8,9,10],[11,12,13,14,15,16,17,18,19,20],[21,22,23,24,25,26,27,28,29,30]])
b4 = torch.stack([t3,t3,t3,t3])
test_eq(b4.shape, torch.Size([4, 3, 10]))
test_eq(_shift(t1,4), tensor([[0, 0, 0, 0, 1, 2, 3, 4, 5, 6]]))
test_eq(_shift(t3,-2), tensor([[3,4,5,6,7,8,9,10,0,0],[13,14,15,16,17,18,19,20,0,0],[23,24,25,26,27,28,29,30,0,0]]))
{% endraw %} {% raw %}
shift_signal(b4, 4, roll=False)
{% endraw %} {% raw %}
shifter = SignalShifter(p=1, max_pct=0.5)
inp, out = show_transform(shifter, GenExample.audio_sc)
{% endraw %} {% raw %}
inp, out = show_transform(shifter, GenExample.sg_sc)
{% endraw %} {% raw %}
audio_orig = GenExample.audio_sc_batch(8)
shifter = SignalShifter(p=1, max_pct=1)
AudioTensor(audio_orig[0], 16000).show()
altered = shifter(audio_orig, split_idx=0)

#AudioTensor((audio_orig[0], 16000, None)).show()
print(altered.shape)
for sig in altered:
    AudioTensor(sig, 16000).show()
{% endraw %} {% raw %}
audio_orig = GenExample.audio_sc()
sg_orig = GenExample.sg_sc()
{% endraw %} {% raw %}
%%time
altered = shifter(audio_orig, split_idx=0)
<<<<<<< local <modified: >
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 305 µs
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%timeit -n3
altered = shifter(audio_orig, split_idx=0)
<<<<<<< local <modified: >
154 µs ± 36.7 µs per loop (mean ± std. dev. of 7 runs, 3 loops each)
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
audio_orig = GenExample.audio_sc_batch(32)
{% endraw %} {% raw %}
%%time
altered = shifter(audio_orig, split_idx=0)
<<<<<<< local <modified: >
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 547 µs
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%time
altered = shifter(sg_orig, split_idx=0)
<<<<<<< local <modified: >
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 280 µs
=======
>>>>>>> remote <removed>
{% endraw %}

Example without rolling

{% raw %}
shifter = SignalShifter(p=1, max_pct=0.5)
inp, out = show_transform(shifter, GenExample.audio_sc)
{% endraw %} {% raw %}
# test a time shift of 1s never shifts more than 1s
for i in range(100):
    time_shifter = SignalShifter(p=1, max_time=1)
    gen_func = lambda: AudioTensor(torch.ones(1, 16000), 16000)
    inp, out = show_transform(time_shifter, gen_func, show=None)
    #just_ones = AudioTensor((torch.ones(16000).unsqueeze(0), 16000, None))
    test_eq(False, torch.allclose(out, torch.zeros(16000)))
{% endraw %} {% raw %}
# demonstrate shifting works on multichannel data (alter show to True to see)
shifter = SignalShifter(p=1, max_time=1)
inp, out = show_transform(shifter, GenExample.audio_mc, show=None)
{% endraw %}

Example with rolling

{% raw %}
shift_and_roll = SignalShifter(p=1, max_pct=0.5, roll=True)
inp, out = show_transform(shift_and_roll, GenExample.audio_sc)
test_eq(inp.data.shape, out.data.shape)
{% endraw %}

Shift Timing Tests

{% raw %}
audio_orig = GenExample.audio_sc()
{% endraw %} {% raw %}
%%time
shifted = shifter(audio_orig, split_idx=0)
<<<<<<< local <modified: >
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 341 µs
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%time
shifted = shift_and_roll(audio_orig, split_idx=0)
<<<<<<< local <modified: >
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 318 µs
=======
>>>>>>> remote <removed>
{% endraw %}

Add Noise to Signal

Adds noise proportional to the energy of the signal (mean of abs value), and the specified noise level.

This uses colorednoise(imported as 'cn'), developed by a data scientist named Felix Patzelt. It allows you to use one simple function to create white, brown, pink and other colors of noise. Each color corresponds to an exponent, violet is -2, blue -1, white is 0, pink is 1, and brown is 2. We abstract this with a class that enumerates the list and shifts it down by two so the exponents are correct, and so that we get tab-completion.

Because this actually draws a spectrogram and does an istft on it, it is about 10x faster if we implement our own white noise (simple and worth doing since it's the most common noise we'll want to use, this is what the if color=0 line does, it overrides and generates white noise using our own simple algo. (Note: Most recent timing tests actually show the opposite, that generating our own white noise is 3x slower? This should be looked into later)

For just plain white noise, if we revert to remove the dependency on this library, the noise can be created with
noise = torch.randn_like(ai.sig) * ai.sig.abs().mean() * noise_level

{% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}

AddNoise[source]

AddNoise(noise_level=0.05, color=0)

{% endraw %}

White noise examples (default)

{% raw %}
noisy = AddNoise()
real_noisy = AddNoise(noise_level=0.5)
inp, out5 = show_transform(noisy, GenExample.audio_sc, show=None)
inp, out50 = show_transform(real_noisy, GenExample.audio_sc, show=None)
msgs = ["Original Audio", "5% White Noise(Default)", "50% White Noise"]
for i, aud in enumerate([inp, out5, out50]):
    print(msgs[i])
    aud.show()
{% endraw %}

Pink Noise Examples

{% raw %}
noisy = AddNoise(color=NoiseColor.Pink)
real_noisy = AddNoise(noise_level=1, color=NoiseColor.Pink)
inp, out5 = show_transform(noisy, GenExample.audio_sc, show=None)
inp, out100 = show_transform(real_noisy, GenExample.audio_sc, show=None)
msgs = ["Original Audio", "5% Pink Noise", "100% Pink Noise"]
for i, aud in enumerate([inp, out5, out100]):
    print(msgs[i])
    aud.show()
{% endraw %} {% raw %}
# demonstrate blue-noise on multichannel data (change "show=None" to "show=True" to see)
noisy = AddNoise(noise_level=0.5, color=NoiseColor.Blue)
show_transform(noisy, GenExample.audio_mc, show=None)
#noisy.show()
<<<<<<< local <modified: text/plain>
(AudioTensor([[ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ..., -8.8501e-04,
          -6.7139e-04, -2.4414e-04],
         [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  0.0000e+00,
           1.2207e-04,  1.8311e-04],
         [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  1.5259e-04,
           9.1553e-05,  3.0518e-05]]),
 AudioTensor([[-0.0002,  0.0109, -0.0064,  ..., -0.0015, -0.0102,  0.0026],
         [-0.0002,  0.0109, -0.0064,  ..., -0.0006, -0.0094,  0.0031],
         [-0.0002,  0.0109, -0.0064,  ..., -0.0005, -0.0094,  0.0029]]))
=======
>>>>>>> remote <removed>
{% endraw %}

Noise Timing Tests

{% raw %}
%%timeit -n10
noise = torch.from_numpy(cn.powerlaw_psd_gaussian(exponent=0, size=audio_orig.nsamples)).float()
scaled_noise = noise * audio_orig.data.abs().mean() * 0.05
out = AudioTensor(audio_orig.data + scaled_noise,audio_orig.sr)
<<<<<<< local <modified: >
4.41 ms ± 127 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%timeit -n10
#Same speed for white noise and brown noise using their algorithm
noise = torch.from_numpy(cn.powerlaw_psd_gaussian(exponent=2, size=audio_orig.nsamples)).float()
scaled_noise = noise * audio_orig.abs().mean() * 0.05
out = AudioTensor(audio_orig.data + scaled_noise,audio_orig.sr)
<<<<<<< local <modified: >
4.64 ms ± 85 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%timeit -n10
noisy = AddNoise(color=NoiseColor.White)(audio_orig)
<<<<<<< local <modified: >
524 µs ± 24.2 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
>>>>>>> remote <removed>
{% endraw %}

Adjust Volume

Note:
This will increase/decrease the energy of the signal but so far it appears to do nothing besides change the absolute values as the audios sound the same, and the spectrograms appear the same. The gain is being correctly applied, but the ipython audio player seems to normalize the volume level (confirmed by outputting and downloading the clips and confirming a difference in noise level). The spectrogram appears the same because it too does a form of normalization when it sets `ref`. We will likely need to adjust the ref value to something constant like np.max or 0 to stop this normalization, as the noise_level is often relevant for deep learning and not something we want to strip out.
{% raw %}
{% endraw %} {% raw %}

AudioTensor.apply_gain[source]

AudioTensor.apply_gain(ai:AudioTensor, gain)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

class ChangeVolume[source]

ChangeVolume(p=0.5, lower=0.5, upper=1.5) :: RandTransform

A transform that before_call its state at each __call__

{% endraw %} {% raw %}
#ipython player normalizes out volume difference, note different y-axis scale but same sound.
volume_adjuster = ChangeVolume(p=1, lower=0.01, upper=0.5)
inp, out = show_transform(volume_adjuster, GenExample.audio_sc)
{% endraw %}

Adjust Volume Timing Tests

{% raw %}
audio_orig = GenExample.audio_sc
audio_mc = GenExample.audio_mc
{% endraw %} {% raw %}
%%timeit -n10
volume_adjuster(audio_orig, split_idx=0)
<<<<<<< local <modified: >
42.5 µs ± 4.26 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%timeit -n10
volume_adjuster(audio_mc, split_idx=0)
<<<<<<< local <modified: >
40.3 µs ± 4.39 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
>>>>>>> remote <removed>
{% endraw %}

Signal Cutout

{% raw %}
{% endraw %} {% raw %}

AudioTensor.cutout[source]

AudioTensor.cutout(ai:AudioTensor, cut_pct)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

class SignalCutout[source]

SignalCutout(p=0.5, max_cut_pct=0.15) :: RandTransform

A transform that before_call its state at each __call__

{% endraw %} {% raw %}
cutter = SignalCutout(p=1, max_cut_pct=0.3)
inp, out = show_transform(cutter, GenExample.audio_sc)
{% endraw %} {% raw %}
# demonstrate SignalCutout on multichannel, confirm the cuts align, (change "show=None" to "show=True" to see)
cut_mc = SignalCutout(p=1, max_cut_pct=0.5)
inp, out = show_transform(cut_mc, GenExample.audio_mc, show=True)
{% endraw %}

Signal Cutout Timing Tests

{% raw %}
audio_orig = GenExample.audio_sc
audio_mc = GenExample.audio_mc
{% endraw %} {% raw %}
%%timeit -n10
cutter(audio_orig, split_idx=0)
<<<<<<< local <modified: >
38.7 µs ± 4.33 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%timeit -n10
cutter(audio_mc, split_idx=0)
<<<<<<< local <modified: >
39.6 µs ± 3.49 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
>>>>>>> remote <removed>
{% endraw %}

Signal Loss

{% raw %}
{% endraw %} {% raw %}

AudioTensor.lose_signal[source]

AudioTensor.lose_signal(ai:AudioTensor, loss_pct)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

class SignalLoss[source]

SignalLoss(p=0.5, max_loss_pct=0.15) :: RandTransform

A transform that before_call its state at each __call__

{% endraw %} {% raw %}
dropper = SignalLoss(p=1, max_loss_pct=0.3)
inp, out = show_transform(dropper, GenExample.audio_sc, show=True)
print(f"Percent Dropped: {100*dropper.loss_pct:.2f}")
<<<<<<< local <modified: >
Percent Dropped: 20.56
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
# Updating to a RandTransform broke these tests

# verify SignalDrop is dropping both the correct number of samples, and dropping
# the same samples from each channel, over a wide range of cut_pcts
# nsamples = fake_multichannel.nsamples
# for cut_pct in np.linspace(0.05, 0.5, 45):
#     dropped_mc = SignalDrop(cut_pct)(fake_multichannel)
#     match1 = (dropped_mc.sig[0] == dropped_mc.sig[1]).sum()
#     match2 = (dropped_mc.sig[0] == dropped_mc.sig[2]).sum()
#     match3 = (dropped_mc.sig[1] == dropped_mc.sig[2]).sum()
#     test_close(match1, cut_pct*nsamples, eps=.02*nsamples)
#     test_close(match2, cut_pct*nsamples, eps=.02*nsamples)
#     test_close(match3, cut_pct*nsamples, eps=.02*nsamples)
{% endraw %}

Signal Drop Timing Tests

{% raw %}
audio_orig = GenExample.audio_sc
audio_mc = GenExample.audio_mc
{% endraw %} {% raw %}
%%timeit -n10
dropper(audio_orig, split_idx=0)
<<<<<<< local <modified: >
37.4 µs ± 3.46 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%timeit -n10
dropper(audio_mc, split_idx=0)
<<<<<<< local <modified: >
39.5 µs ± 3.61 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
>>>>>>> remote <removed>
{% endraw %}

DownmixMono

{% raw %}
{% endraw %} {% raw %}

DownmixMono[source]

DownmixMono()

{% endraw %} {% raw %}
downmixer = DownmixMono()
inp, out = show_transform(downmixer, GenExample.audio_mc, show=True)
{% endraw %} {% raw %}
# test downmixing 1 channel has no effect
inp, out = show_transform(downmixer, GenExample.audio_sc, show=None)
test_eq(inp.data, out.data)
{% endraw %} {% raw %}
# example showing a batch of 8 signals 
inp, out = show_transform(downmixer, GenExample.audio_mc_batch, show=None)
print(f"Before shape: {inp.shape}\nAfter shape: {out.shape}")
{% endraw %}

DownmixMono Timing Tests

{% raw %}
audio_mc = GenExample.audio_mc()
{% endraw %} {% raw %}
%%timeit -n10
downmixer(audio_mc)
<<<<<<< local <modified: >
93.1 µs ± 9.86 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=======
>>>>>>> remote <removed>
{% endraw %}

Spectrogram Transforms

Time Cropping

TO-DO:
1. In spectrogram when we pad with mean value we mess up normalization by altering std dev, how can we use fill values that dont mess things up
{% raw %}
{% endraw %} {% raw %}

CropTime[source]

CropTime(duration, pad_mode='zeros')

{% endraw %} {% raw %}
{% endraw %} {% raw %}
audio_orig = GenExample.audio_sc()
{% endraw %} {% raw %}
crop_1000ms = CropTime(1000)
crop_2000ms = CropTime(2000)
crop_5000ms = CropTime(5000)
print(f"Audio is {audio_orig.duration} seconds")
{% endraw %} {% raw %}
type(GenExample.sg_sc())
{% endraw %} {% raw %}
orig_settings = dict(GenExample.sg_sc().settings)
inp, out1 = show_transform(crop_1000ms, GenExample.sg_sc, show=None)
inp, out2 = show_transform(crop_2000ms, GenExample.sg_sc, show=None)
inp, out5 = show_transform(crop_5000ms, GenExample.sg_sc, show=None)
for spec in [inp, out1, out2, out5]:
    spec.show()
{% endraw %}
Note:
Because a spectrograms duration is dependent on rounding (samples/hop_length usually has a remainder that is padded up to an extra pixel), we cant use exact durations, so we must test_close instead of test_eq. This could be fixed by storing the AudioTensors duration when the sg is generated, and also updating the duration manually anytime a Transform occurs that affects the size time axis (x-axis)
{% raw %}
inp.duration, out1.duration, out5.duration
{% endraw %} {% raw %}
settings_match = [orig_settings[key] == out1.settings[key] for key in orig_settings.keys() if key != "transformer"]
assert False not in settings_match
test_close(out1.width, int((1/inp.duration)*inp.width), eps=1.01)
test_close(out2.width, int((2/inp.duration)*inp.width), eps=1.01)
test_close(out5.width, int((5/inp.duration)*inp.width), eps=1.01)
{% endraw %} {% raw %}
# test AudioToSpec->CropTime and CropSignal->AudioToSpec will result in same size images
oa = OpenAudio(files)
crop_dur = random.randint(1000,5000)
DBMelSpec = SpectrogramTransformer(mel=True, to_db=True)
pipe_cropsig  = Pipeline([oa, DBMelSpec(hop_length=128), CropTime(crop_dur)])
pipe_cropspec = Pipeline([oa, CropSignal(crop_dur), DBMelSpec(hop_length=128), ])
for i in range(50):
    test_eq(pipe_cropsig(i).width, pipe_cropspec(i).width)
{% endraw %} {% raw %}
# test pad_mode zeros-after by verifying sg ends with zeros and begins with non-zeros
sg_orig = GenExample.sg_sc()
crop_5000ms = CropTime(5000, pad_mode=AudioPadType.Zeros_After)
crop_5000ms(sg_orig)
test_eq(sg_orig[:,:,-1], torch.zeros_like(sg_orig)[:,:,-1])
test_ne(sg_orig[:,:,0], torch.zeros_like(sg_orig)[:,:,-1])
test_eq(sg_orig.duration, 5)
{% endraw %} {% raw %}
# test pad_mode repeat by making sure that columns are equal at the appropriate offsets
crop_12000ms_repeat = CropTime(12000, pad_mode=AudioPadType.Repeat)
inp,out = show_transform(crop_12000ms_repeat, GenExample.sg_sc, show=True)
for i in range(inp.width):
    test_eq(out[:,:,i], out[:,:,i+inp.width])
    test_eq(out[:,:,i], out[:,:,i+2*inp.width])
{% endraw %} {% raw %}
# test bad pad_mode doesnt fail silently, correct is 'zeros_after'
test_fail(CropTime(12000, pad_mode="zerosafter"))
{% endraw %} {% raw %}
out1.shape, out2.shape, out5.shape
{% endraw %} {% raw %}
# demonstrate on multichannel audio, (change "show=None" to "show=True" to see)
inp,out = show_transform(crop_1000ms, GenExample.sg_mc, show=None)
{% endraw %}

CropTime Timing Tests

{% raw %}
sg_orig = GenExample.sg_sc()
{% endraw %} {% raw %}
%%time
#1s zero-padded crop
out = crop_1000ms(sg_orig)
<<<<<<< local <modified: >
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 57.9 µs
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%time
#5s zero-padded crop
out = crop_5000ms(sg_orig)
<<<<<<< local <modified: >
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 171 µs
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%time
#12s repeat-padded crop
out = crop_12000ms_repeat(sg_orig)
<<<<<<< local <modified: >
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 260 µs
=======
>>>>>>> remote <removed>
{% endraw %}
If we wanted to we could make a class for these transforms that keeps the masked portion as state so that we could write a decodes method to go back to the original

Time and Frequency Masking (SpecAugment)

{% raw %}
{% endraw %} {% raw %}

MaskFreq[source]

MaskFreq(num_masks=1, size=20, start=None, val=None, **kwargs)

{% endraw %}
Passing around the settings manually is already fairly clunky, but is especially bad when we have to do it twice when MaskTime hands off to MaskFrequency. We should maybe make a copy of the AudioSpectrogram and then alter the tensor for it's sg rather than cloning out the sg and then building a new object at the end. Or just keep a reference to the parent tensor and pass that along, and have getattr recur looking for settings of the parents
{% raw %}
{% endraw %} {% raw %}

MaskTime[source]

MaskTime(num_masks=1, size=20, start=None, val=None, **kwargs)

{% endraw %} {% raw %}
sg_orig = GenExample.sg_sc()
{% endraw %} {% raw %}
inp,out = show_transform(MaskFreq(), GenExample.sg_sc)
{% endraw %} {% raw %}
inp,out = show_transform(MaskTime(), GenExample.sg_sc)
{% endraw %} {% raw %}
# create a random frequency mask and test that it is being correctly applied
size, start, val = [random.randint(1, 50) for i in range(3)]
freq_mask_test = MaskFreq(size=size, start=start, val=val)
inp,out = show_transform(freq_mask_test, GenExample.sg_sc, show=None)
out.show()
test_eq(out[:,start:start+size,:], val*torch.ones_like(inp)[:,start:start+size,:])
{% endraw %} {% raw %}
# create a random time mask and test that it is being correctly applied
size, start, val = [random.randint(1, 50) for i in range(3)]
time_mask_test = MaskTime(size=size, start=start, val=val)
inp,out = show_transform(time_mask_test, GenExample.sg_sc, show=None)
out.show()
test_eq(out[:,:,start:start+size], val*torch.ones_like(inp)[:,:,start:start+size])
{% endraw %} {% raw %}
# demonstrate on multichannel audio, (change "show=None" to "show=True" to see)
inp,out = show_transform(MaskFreq(size=40), GenExample.sg_mc, show=None)
{% endraw %}

SpecAugment Timing Tests

{% raw %}
sg_orig = GenExample.sg_sc()
freq_mask = MaskFreq()
time_mask = MaskTime()
{% endraw %} {% raw %}
%%time
out = freq_mask(sg_orig)
<<<<<<< local <modified: >
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 432 µs
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
sg_orig = GenExample.sg_sc()
{% endraw %} {% raw %}
%%time
# time masking ~90µs slower because we transpose, delegate to MaskFreq, and transpose back, we could
# fix this at the expense of a bit more code 
out = time_mask(sg_orig)
<<<<<<< local <modified: >
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 525 µs
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
sg_mc = GenExample.sg_mc()
{% endraw %} {% raw %}
%%time
out = freq_mask(sg_mc)
<<<<<<< local <modified: >
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 409 µs
=======
>>>>>>> remote <removed>
{% endraw %}

Spectrogram Rolling

{% raw %}
{% endraw %} {% raw %}

SGRoll[source]

SGRoll(max_shift_pct=0.5, direction=0, **kwargs)

Shifts spectrogram along x-axis wrapping around to other side

{% endraw %} {% raw %}
inp,out = show_transform(SGRoll(), GenExample.sg_sc)
{% endraw %} {% raw %}
# demonstrate on multichannel audio, (change "show=None" to "show=True" to see)
inp,out = show_transform(SGRoll(), GenExample.sg_mc, show=None)
{% endraw %}

SGRollTiming Tests

{% raw %}
sg_orig = GenExample.sg_sc()
sg_multi = GenExample.sg_mc()
roller = SGRoll()
{% endraw %} {% raw %}
%%time
out = roller(sg_orig)
<<<<<<< local <modified: >
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 936 µs
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%time
out = roller(sg_multi)
<<<<<<< local <modified: >
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 1.48 ms
=======
>>>>>>> remote <removed>
{% endraw %}

Delta/Accelerate

TO-DO: Test delta as part of a pipeline to make sure SpecAugment/roll/interpolate...etc are working on multichannel
{% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}

Delta[source]

Delta(width=9)

{% endraw %} {% raw %}
delta = Delta()
{% endraw %} {% raw %}
inp, out = show_transform(delta, GenExample.sg_sc)
#nchannels for a spectrogram is how many channels its original audio had
test_eq(out.nchannels, inp.nchannels)
test_eq(out.shape[1:], inp.shape[1:])
test_ne(out[0],out[1])
{% endraw %} {% raw %}
# demonstrate delta on multichannel audio, (change "show=None" to "show=True" to see)
inp,out = show_transform(delta, GenExample.sg_mc, show=None)
{% endraw %}

Delta Timing Tests

{% raw %}
sg_orig = GenExample.sg_sc()
sg_mc = GenExample.sg_mc()
{% endraw %} {% raw %}
%%time
out = delta(sg_orig)
<<<<<<< local <modified: >
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 2.59 ms
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%time
out = delta(sg_mc)
<<<<<<< local <modified: >
CPU times: user 28 ms, sys: 0 ns, total: 28 ms
Wall time: 7.39 ms
=======
>>>>>>> remote <removed>
{% endraw %}

Image resizing

This should probably be refactored to use visions size transform since it already exists
{% raw %}
{% endraw %} {% raw %}

TfmResize[source]

TfmResize(size, interp_mode='bilinear', **kwargs)

Temporary fix to allow image resizing transform

{% endraw %} {% raw %}
# Test when size is an int
size = 224
resize_int = TfmResize(size)
inp, out = show_transform(resize_int, GenExample.sg_sc, show=None)
print("Original Shape: ", inp.shape)
print("Resized Shape :" , out.shape)
test_eq(out.shape[1:], torch.Size([size,size]))
{% endraw %} {% raw %}
# Test when size is a tuple with unequal values
size_tup=(124,581)
resize_tup = TfmResize(size_tup)
inp, out = show_transform(resize_tup, GenExample.sg_sc, show=None)
print("Original Shape: ", inp.shape)
print("Resized Shape :" , out.shape)
test_eq(out.shape[1:], torch.Size(size_tup))
{% endraw %} {% raw %}
# demonstrate resizing on multichannel sg, (change "show=None" to "show=True" to see)
resizer = TfmResize((200,100))
inp,out = show_transform(resizer, GenExample.sg_mc, show=None)
print("Original Shape: ", inp.shape)
print("Resized Shape :" , out.shape)
{% endraw %}

Resize Timing Tests

{% raw %}
resizer = TfmResize(224)
sg_orig = GenExample.sg_sc()
sg_mc = GenExample.sg_mc()
{% endraw %} {% raw %}
%%time
out = resizer(sg_orig)
<<<<<<< local <modified: >
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 741 µs
=======
>>>>>>> remote <removed>
{% endraw %} {% raw %}
%%time
out = resizer(sg_mc)
<<<<<<< local <modified: >
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.1 ms
=======
>>>>>>> remote <removed>
{% endraw %}

Pipelines

Signal Pipelines

{% raw %}
files
{% endraw %} {% raw %}
oa = OpenAudio(files); oa
{% endraw %}

Signal Pipelines

{% raw %}
#Show simple preprocessing that trims silence, crops to 2s, and resamples to 4000hz (low-quality)
preprocess_pipe = Pipeline([oa, RemoveSilence(), CropSignal(2000), Resample(4000)])
for i in range(3): preprocess_pipe(i).show()
{% endraw %} {% raw %}
#Show a very noisy set of signal augmentations
augment_pipe1 = Pipeline([oa, RemoveSilence(), CropSignal(2000), AddNoise(noise_level=0.3), SignalLoss()])
for i in range(3): augment_pipe1(i).show()
{% endraw %}
Bug: Signal Cutout does not appear to be working in this pipeline
{% raw %}
#Show another set of signal augmentations
augment_pipe2 = Pipeline([oa, RemoveSilence(), CropSignal(2000), AddNoise(color=NoiseColor.Blue), 
                          SignalShifter(roll=True), SignalCutout()])
for i in range(3): augment_pipe2(i).show()
{% endraw %}

Spectrogram Pipelines

{% raw %}
#Basic melspectrogram pipe with advanced SpecAugment 
sg_cfg = AudioConfig.BasicMelSpectrogram(hop_length=256, n_fft=2048)
pipe = Pipeline([oa, AudioToSpec.from_cfg(sg_cfg), CropTime(2000), MaskTime(num_masks=2, size=4), MaskFreq()])
for i in range(5): pipe.show(pipe(i))
{% endraw %} {% raw %}
#Pipe with only spectrogram transforms, notably Delta/Accelerate appended
voice_cfg = AudioConfig.Voice()
delta_pipe = Pipeline([oa, AudioToSpec.from_cfg(voice_cfg), CropTime(2000), Delta(), MaskTime(size=4), MaskFreq()])
for i in range(5): delta_pipe.show(delta_pipe(i))
{% endraw %} {% raw %}
for i in range(5): pipe.show(pipe(i))
{% endraw %} {% raw %}
#Pipe with signal and spectro transforms, and a lot of noise
voice_cfg = AudioConfig.Voice()
everything_pipe = Pipeline([oa, 
                            RemoveSilence(), CropSignal(2000), AddNoise(noise_level=0.3), SignalLoss(), 
                            AudioToSpec.from_cfg(voice_cfg), MaskTime(size=4), MaskFreq(), Delta()])
for i in range(5): everything_pipe.show(everything_pipe(i))
{% endraw %}

Export